/*
 * Copyright 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

@file:JvmName("SnapshotStateKt")
@file:JvmMultifileClass

package androidx.compose.runtime

import androidx.collection.MutableScatterSet
import androidx.compose.runtime.collection.fastAny
import androidx.compose.runtime.snapshots.ReaderKind
import androidx.compose.runtime.snapshots.Snapshot
import androidx.compose.runtime.snapshots.StateObjectImpl
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.withContext

/**
 * Collects values from this [StateFlow] and represents its latest value via [State]. The
 * [StateFlow.value] is used as an initial value. Every time there would be new value posted into
 * the [StateFlow] the returned [State] will be updated causing recomposition of every [State.value]
 * usage.
 *
 * @sample androidx.compose.runtime.samples.StateFlowSample
 * @param context [CoroutineContext] to use for collecting.
 */
@Suppress("StateFlowValueCalledInComposition")
@Composable
public fun <T> StateFlow<T>.collectAsState(
    context: CoroutineContext = EmptyCoroutineContext
): State<T> = collectAsState(value, context)

/**
 * Collects values from this [Flow] and represents its latest value via [State]. Every time there
 * would be new value posted into the [Flow] the returned [State] will be updated causing
 * recomposition of every [State.value] usage.
 *
 * @sample androidx.compose.runtime.samples.FlowWithInitialSample
 * @param initial the value of the state will have until the first flow value is emitted.
 * @param context [CoroutineContext] to use for collecting.
 */
@Composable
public fun <T : R, R> Flow<T>.collectAsState(
    initial: R,
    context: CoroutineContext = EmptyCoroutineContext,
): State<R> =
    produceState(initial, this, context) {
        if (context == EmptyCoroutineContext) {
            collect { value = it }
        } else withContext(context) { collect { value = it } }
    }

/**
 * Create a [Flow] from observable [Snapshot] state. (e.g. state holders returned by
 * [mutableStateOf][androidx.compose.runtime.mutableStateOf].)
 *
 * [snapshotFlow] creates a [Flow] that runs [block] when collected and emits the result, recording
 * any snapshot state that was accessed. While collection continues, if a new [Snapshot] is applied
 * that changes state accessed by [block], the flow will run [block] again, re-recording the
 * snapshot state that was accessed. If the result of [block] is not [equal to][Any.equals] the
 * previous result, the flow will emit that new result. (This behavior is similar to that of
 * [Flow.distinctUntilChanged][kotlinx.coroutines.flow.distinctUntilChanged].) Collection will
 * continue indefinitely unless it is explicitly cancelled or limited by the use of other [Flow]
 * operators.
 *
 * @sample androidx.compose.runtime.samples.snapshotFlowSample
 *
 * [block] is run in a **read-only** [Snapshot] and may not modify snapshot data. If [block]
 * attempts to modify snapshot data, flow collection will fail with [IllegalStateException].
 *
 * [block] may run more than once for equal sets of inputs or only once after many rapid snapshot
 * changes; it should be idempotent and free of side effects.
 *
 * When working with [Snapshot] state it is useful to keep the distinction between **events** and
 * **state** in mind. [snapshotFlow] models snapshot changes as events, but events **cannot** be
 * effectively modeled as observable state. Observable state is a lossy compression of the events
 * that produced that state.
 *
 * An observable **event** happens at a point in time and is discarded. All registered observers at
 * the time the event occurred are notified. All individual events in a stream are assumed to be
 * relevant and may build on one another; repeated equal events have meaning and therefore a
 * registered observer must observe all events without skipping.
 *
 * Observable **state** raises change events when the state changes from one value to a new, unequal
 * value. State change events are **conflated;** only the most recent state matters. Observers of
 * state changes must therefore be **idempotent;** given the same state value the observer should
 * produce the same result. It is valid for a state observer to both skip intermediate states as
 * well as run multiple times for the same state and the result should be the same.
 */
public fun <T> snapshotFlow(block: () -> T): Flow<T> = flow {
    // Objects read the last time block was run
    val readSet = MutableScatterSet<Any>()
    val readObserver: (Any) -> Unit = {
        if (it is StateObjectImpl) {
            it.recordReadIn(ReaderKind.SnapshotFlow)
        }
        readSet.add(it)
    }

    // This channel may not block or lose data on a trySend call.
    val appliedChanges = Channel<Set<Any>>(Channel.UNLIMITED)

    // Register the apply observer before running for the first time
    // so that we don't miss updates.
    val unregisterApplyObserver =
        Snapshot.registerApplyObserver { changed, _ ->
            val maybeObserved =
                changed.fastAny { it !is StateObjectImpl || it.isReadIn(ReaderKind.SnapshotFlow) }

            if (maybeObserved) {
                appliedChanges.trySend(changed)
            }
        }

    try {
        var lastValue =
            Snapshot.takeSnapshot(readObserver).run {
                try {
                    enter(block)
                } finally {
                    dispose()
                }
            }
        emit(lastValue)

        while (true) {
            var found = false
            var changedObjects = appliedChanges.receive()

            // Poll for any other changes before running block to minimize the number of
            // additional times it runs for the same data
            while (true) {
                // Assumption: readSet will typically be smaller than changed set
                found = found || readSet.intersects(changedObjects)
                changedObjects = appliedChanges.tryReceive().getOrNull() ?: break
            }

            if (found) {
                readSet.clear()
                val newValue =
                    Snapshot.takeSnapshot(readObserver).run {
                        try {
                            enter(block)
                        } finally {
                            dispose()
                        }
                    }

                if (newValue != lastValue) {
                    lastValue = newValue
                    emit(newValue)
                }
            }
        }
    } finally {
        unregisterApplyObserver.dispose()
    }
}

private fun MutableScatterSet<Any>.intersects(set: Set<Any>) = any { it in set }
